﻿// usocket_test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//

#include <iostream>
#include "defaults.h"
#include "uWS.h"
#include <iostream>
#include <chrono>
#include <cmath>
#include <thread>
#include <fstream>
#include <vector>
#include <set>
#include <unordered_set>
#include <unordered_map>
#include <map>
#include <atomic>
#include "peer_connection_wsclient.h"
#include "rtc_base/json.h"
#include "JanusHandle.h"
#include "JanusTransaction.h"
#include <list>
#include "rtc_base/checks.h"
#include "rtc_base/json.h"
#include "rtc_base/logging.h"

#include "api/audio_codecs/builtin_audio_decoder_factory.h"
#include "api/audio_codecs/builtin_audio_encoder_factory.h"
#include "api/video_codecs/builtin_video_decoder_factory.h"
#include "api/video_codecs/builtin_video_encoder_factory.h"
#include "api/test/fakeconstraints.h"
#include "defaults.h"
#include "media/engine/webrtcvideocapturerfactory.h"
#include "modules/audio_device/include/audio_device.h"
#include "modules/audio_processing/include/audio_processing.h"
#include "modules/video_capture/video_capture_factory.h"
#include "peer_connection.h"


#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/stringutils.h"
#include "rtc_base/win32socketserver.h"
#include "rtc_base/win32socketinit.h"

#include "third_party/libyuv/include/libyuv/convert_argb.h"

#if defined(WEBRTC_WIN)
#include "rtc_base/win32.h"
#endif  // WEBRTC_WIN


using namespace std;
using namespace rtc;

struct REMOTE_SDP_INFO {
    long long int handleId;
    std::string jsep_str;
};


static std::string OptString(std::string message, list<string> keyList) {
    //parse json
    Json::Reader reader;
    Json::Value jmessage;
    if (!reader.parse(message, jmessage)) {
        RTC_LOG(WARNING) << "Received unknown message. " << message;
        return std::string("");//FIXME should return by another param with type enum
    }
    Json::Value jvalue = jmessage;
    Json::Value jvalue2;
    for (auto key : keyList) {
        if (rtc::GetValueFromJsonObject(jvalue, key,
            &jvalue2)) {
            jvalue = jvalue2;
        }
        else {
            return std::string("");
        }
    }
    std::string tmp_str;
    rtc::GetStringFromJson(jvalue, &tmp_str);
    //std::string tmp_str = rtc::JsonValueToString(jvalue);//this result sdp parse error beacause /r/n
    return tmp_str;
}
static
Json::Value optJSONValue(std::string message, list<string> keyList) {
    //parse json
    Json::Reader reader;
    Json::Value jmessage;
    if (!reader.parse(message, jmessage)) {
        RTC_LOG(WARNING) << "Received unknown message. " << message;
        return NULL;//FIXME should return by another param with type enum
    }
    Json::Value jvalue = jmessage;
    Json::Value jvalue2;
    for (auto key : keyList) {
        if (rtc::GetValueFromJsonObject(jvalue, key,
            &jvalue2)) {
            jvalue = jvalue2;
        }
        else {
            return NULL;
        }
    }
    return jvalue;
}


static long long int OptLLInt(std::string message, list<string> keyList) {
    //parse json
    Json::Reader reader;
    Json::Value jmessage;
    if (!reader.parse(message, jmessage)) {
        RTC_LOG(WARNING) << "Received unknown message. " << message;
        return 0;//FIXME should return by another param with type enum
    }
    Json::Value jvalue = jmessage;
    Json::Value jvalue2;
    for (auto key : keyList) {
        if (rtc::GetValueFromJsonObject(jvalue, key,
            &jvalue2)) {
            jvalue = jvalue2;
        }
        else {
            return 0;
        }
    }
    std::string tmp_str = rtc::JsonValueToString(jvalue);
    return std::stoll(tmp_str);
}


class VideoRoomClient :
    public PeerConnectionWsClientObserver ,
    public PeerConnectionCallback ,
    public sigslot::has_slots<>{
public:


    VideoRoomClient() {
        mWsClient = new PeerConnectionWsClient();

        m_network_thread = rtc::Thread::CreateWithSocketServer();
        m_network_thread->SetName("network_thread", nullptr);
        m_network_thread->Start();

        m_worker_thread = rtc::Thread::Create();
        m_worker_thread->SetName("worker_thread", nullptr);
        m_worker_thread->Start();

        m_signaling_thread = rtc::Thread::Create();
        m_signaling_thread->SetName("signaling_thread", nullptr);
        m_signaling_thread->Start();
    }
    void SendBitrateConstraint(long long int handleId) {
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;
        jt->Success = [=](std::string message) {

        };

        jt->Event = [=](std::string message) {
            list<string> resultList = { "plugindata","data","result" };
            std::string jsep_str = OptString(message, resultList);
            if (jsep_str != "ok") {
                //没有设置成功
            }
        };

        jt->Error = [=](std::string, std::string) {
            RTC_LOG(INFO) << "CreateHandle error:";
        };


        m_transactionMap[transactionID] = jt;

        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jbody;

        jbody["bitrate"] = 128000;
        jbody["request"] = "configure";

        jmessage["janus"] = "message";
        jmessage["body"] = jbody;
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        jmessage["handle_id"] = handleId;
        //client_->SendToJanusAsync(writer.write(jmessage));
    }
    std::unique_ptr<cricket::VideoCapturer> OpenVideoCaptureDevice() {
        std::vector<std::string> device_names;
        {
            std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(
                webrtc::VideoCaptureFactory::CreateDeviceInfo());
            if (!info) {
                return nullptr;
            }
            int num_devices = info->NumberOfDevices();
            for (int i = 0; i < num_devices; ++i) {
                const uint32_t kSize = 256;
                char name[kSize] = { 0 };
                char id[kSize] = { 0 };
                if (info->GetDeviceName(i, name, kSize, id, kSize) != -1) {
                    device_names.push_back(name);
                }
            }
        }

        cricket::WebRtcVideoDeviceCapturerFactory factory;
        std::unique_ptr<cricket::VideoCapturer> capturer;
        for (const auto& name : device_names) {
            capturer = factory.Create(cricket::Device(name, 0));
            if (capturer) {
                break;
            }
        }
        return capturer;
    }
    void AddTracks(long long int handleId) {
        if (!m_peer_connection_map[handleId]->peer_connection_->GetSenders().empty()) {
            return;  // Already added tracks.
        }

        rtc::scoped_refptr<webrtc::AudioTrackInterface> audio_track(
            peer_connection_factory_->CreateAudioTrack(
                kAudioLabel, peer_connection_factory_->CreateAudioSource(
                    cricket::AudioOptions())));
        auto result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(audio_track, { kStreamId });
        if (!result_or_error.ok()) {
            RTC_LOG(LS_ERROR) << "Failed to add audio track to PeerConnection: "
                << result_or_error.error().message();
        }

        std::unique_ptr<cricket::VideoCapturer> video_device =
            OpenVideoCaptureDevice();
        if (video_device) {
            webrtc::FakeConstraints constraints;
            std::list<std::string> keyList = { webrtc::MediaConstraintsInterface::kMinWidth, webrtc::MediaConstraintsInterface::kMaxWidth,
                webrtc::MediaConstraintsInterface::kMinHeight, webrtc::MediaConstraintsInterface::kMaxHeight,
                webrtc::MediaConstraintsInterface::kMinFrameRate, webrtc::MediaConstraintsInterface::kMaxFrameRate,
                webrtc::MediaConstraintsInterface::kMinAspectRatio, webrtc::MediaConstraintsInterface::kMaxAspectRatio };

            //set media constraints
            std::map<std::string, std::string> opts;
            opts[webrtc::MediaConstraintsInterface::kMaxFrameRate] = 18;
            opts[webrtc::MediaConstraintsInterface::kMaxWidth] = 1280;
            opts[webrtc::MediaConstraintsInterface::kMaxHeight] = 720;

            for (auto key : keyList) {
                if (opts.find(key) != opts.end()) {
                    constraints.AddMandatory(key, opts.at(key));
                }
            }
            rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track_(
                peer_connection_factory_->CreateVideoTrack(
                    kVideoLabel, peer_connection_factory_->CreateVideoSource(
                        std::move(video_device), nullptr)));
            //main_wnd_->StartLocalRenderer(video_track_);
            //
            //m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track_);

            result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(video_track_, { kStreamId });
            if (!result_or_error.ok()) {
                RTC_LOG(LS_ERROR) << "Failed to add video track to PeerConnection: "
                    << result_or_error.error().message();
            }
        }
        else {
            RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed";
        }
    }
    bool CreatePeerConnection(long long int handleId, bool dtls) {
        RTC_DCHECK(peer_connection_factory_);
        if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) {
            //existed
            return false;
        }

        webrtc::PeerConnectionInterface::RTCConfiguration config;
        config.tcp_candidate_policy = webrtc::PeerConnectionInterface::TcpCandidatePolicy::kTcpCandidatePolicyDisabled;
        config.bundle_policy = webrtc::PeerConnectionInterface::BundlePolicy::kBundlePolicyMaxBundle;
        config.rtcp_mux_policy = webrtc::PeerConnectionInterface::RtcpMuxPolicy::kRtcpMuxPolicyRequire;
        config.continual_gathering_policy = webrtc::PeerConnectionInterface::ContinualGatheringPolicy::GATHER_CONTINUALLY;
        config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
        config.enable_dtls_srtp = dtls;
        //additonal setting
        if (!config.prerenderer_smoothing()) {
            config.set_prerenderer_smoothing(true);
        }
        config.disable_ipv6 = true;
        config.enable_rtp_data_channel = false;
        webrtc::PeerConnectionInterface::IceServer server;
        server.uri = GetPeerConnectionString();
        config.servers.push_back(server);

        webrtc::PeerConnectionInterface::BitrateParameters bitrateParam;
        bitrateParam.min_bitrate_bps = absl::optional<int>(512000);
        bitrateParam.current_bitrate_bps = absl::optional<int>(1256000);
        bitrateParam.max_bitrate_bps = absl::optional<int>(1512000);

        rtc::scoped_refptr<PeerConnection> peer_connection(
            new rtc::RefCountedObject<PeerConnection>());

        peer_connection->peer_connection_ = peer_connection_factory_->CreatePeerConnection(
            config, nullptr, nullptr, peer_connection);
        //set max/min bitrate
        peer_connection->peer_connection_->SetBitrate(bitrateParam);
        //add to the map
        peer_connection->RegisterObserver(this);
        peer_connection->SetHandleId(handleId);
        m_peer_connection_map[handleId] = peer_connection;

        return m_peer_connection_map[handleId]->peer_connection_ != nullptr;
    }
    bool InitializePeerConnection(long long int handleId, bool bPublisher) {
        if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) {
            //existing
            return false;
        }

        if (!peer_connection_factory_) {
            peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
                this->m_network_thread.get() /* network_thread */, 
                this->m_worker_thread.get() /* worker_thread */,
                this->m_signaling_thread.get() /* signaling_thread */,
                nullptr /* default_adm */,
                webrtc::CreateBuiltinAudioEncoderFactory(),
                webrtc::CreateBuiltinAudioDecoderFactory(),
                webrtc::CreateBuiltinVideoEncoderFactory(),
                webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
                nullptr /* audio_processing */);
        }


        if (!peer_connection_factory_) {
            std::cout << "Failed to initialize PeerConnectionFactory" << std::endl;
            DeletePeerConnection(handleId);
            return false;
        }

        if (!CreatePeerConnection(handleId,/*dtls=*/true)) {
            std::cout << "CreatePeerConnection failed" << std::endl;
            DeletePeerConnection(handleId);
        }
        //subscriber no need local tracks(audio and video)
        if (bPublisher) {
            AddTracks(handleId);
        }
        m_peer_connection_map[handleId]->b_publisher_ = true;

        return m_peer_connection_map[handleId]->peer_connection_ != nullptr;
    }

    void DeletePeerConnection(long long int handleId) {
        m_peer_connection_map[handleId]->StopRenderer();
        m_peer_connection_map[handleId]->peer_connection_ = nullptr;
        //peer_connection_factory_ = nullptr; //TODO should destroy before quit
    }
    void ConectToServer(std::string ip, int port) {
        auto ws_server = std::string("ws://") + ip + std::string(":") + std::to_string(port);
        mWsClient->RegisterObserver(this);
        mWsClient->Connect(ws_server, "1111");
    }
    void UIThreadCallback(int msg_id, void* data) {
        switch (msg_id) {
        case PEER_CONNECTION_CLOSED:
            RTC_LOG(INFO) << "PEER_CONNECTION_CLOSED";
            for (auto& key : m_peer_connection_map) {
                DeletePeerConnection(key.first);
            }


        case NEW_TRACK_ADDED: {
            NEW_TRACK* pTrack = (NEW_TRACK*)data;
            long long int handleId = pTrack->handleId;
            auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(pTrack->pInterface);
            if (track->kind() == webrtc::MediaStreamTrackInterface::kVideoKind) {
                auto* video_track = static_cast<webrtc::VideoTrackInterface*>(track);
                //main_wnd_->StartRemoteRenderer(video_track);
                //m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track);
            }
            track->Release();
            delete pTrack;
            break;
        }

        case TRACK_REMOVED: {
            // Remote peer stopped sending a track.
            auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data);
            track->Release();
            break;
        }

        case CREATE_OFFER: {
            long long int* pHandleId = (long long int*)(data);
            long long int handleId = *pHandleId;
            if (InitializePeerConnection(handleId, true)) {
                m_peer_connection_map[handleId]->CreateOffer();
            }
            else {
                std::cout << "Error" << "Failed to initialize PeerConnection";
            }
            break;
        }

        case SET_REMOTE_ANSWER: {
            REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data);
            long long int handleId = pInfo->handleId;
            std::string jsep_str = pInfo->jsep_str;
            std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
                webrtc::CreateSessionDescription(webrtc::SdpType::kAnswer, jsep_str);
            //auto* session_description = reinterpret_cast<webrtc::SessionDescriptionInterface*>(data);
            //auto* session_description = reinterpret_cast<webrtc::SessionDescriptionInterface*>(pInfo->pInterface);	
            m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release());
            //delete pInfo;
            //TODO fixme suitable here?
            SendBitrateConstraint(handleId);
            break;
        }

        case SET_REMOTE_OFFER: {
            REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data);
            long long int handleId = pInfo->handleId;
            std::string jsep_str = pInfo->jsep_str;
            std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
                webrtc::CreateSessionDescription(webrtc::SdpType::kOffer, jsep_str);
            //as subscriber
            if (InitializePeerConnection(handleId, false)) {
                m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release());
                m_peer_connection_map[handleId]->CreateAnswer();

            }
            else {
                std::cout << "Error" << "Failed to initialize PeerConnection";

            }
            //peerConnection.setRemoteDescription(sdpObserver, sdp);
            //peerConnection.createAnswer(connection.sdpObserver, sdpMediaConstraints);
            break;
        }

        default:
            RTC_NOTREACHED();
            break;
        }
    }
    void SendOffer(long long int handleId, std::string sdp_type, std::string sdp_desc) {
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;

        jt->Event = [=](std::string message) {
            list<string> resultList = { "jsep","sdp" };
            std::string jsep_str = OptString(message, resultList);
            REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO;
            pInfo->handleId = handleId;
            pInfo->jsep_str = jsep_str;
            UIThreadCallback(SET_REMOTE_ANSWER, pInfo);
        };

        m_transactionMap[transactionID] = jt;

        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jbody;
        Json::Value jjsep;

        jbody["request"] = "configure";
        jbody["audio"] = true;
        jbody["video"] = true;

        jjsep["type"] = sdp_type;
        jjsep["sdp"] = sdp_desc;

        jmessage["body"] = jbody;
        jmessage["jsep"] = jjsep;
        jmessage["janus"] = "message";
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        jmessage["handle_id"] = handleId;
        //beacause the thread is on UI,so shift thread to ws thread
        mWsClient->SendToJanusAsync(writer.write(jmessage));
    }
    void SendAnswer(long long int handleId, std::string sdp_type, std::string sdp_desc) {
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;

        jt->Event = [=](std::string message) {

        };

        m_transactionMap[transactionID] = jt;

        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jbody;
        Json::Value jjsep;

        jbody["request"] = "start";
        jbody["room"] = "1234";

        jjsep["type"] = sdp_type;
        jjsep["sdp"] = sdp_desc;

        jmessage["body"] = jbody;
        jmessage["jsep"] = jjsep;
        jmessage["janus"] = "message";
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        jmessage["handle_id"] = handleId;
        //beacause the thread is on UI,so shift thread to ws thread
        mWsClient->SendToJanusAsync(writer.write(jmessage));
    }
    void trickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) {
        std::string transactionID = RandomString(12);
        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jcandidate;

        std::string sdp;
        if (!candidate->ToString(&sdp)) {
            RTC_LOG(LS_ERROR) << "Failed to serialize candidate";
            return;
        }

        jcandidate["sdpMid"] = candidate->sdp_mid();
        jcandidate["sdpMLineIndex"] = candidate->sdp_mline_index();
        jcandidate["candidate"] = sdp;

        jmessage["janus"] = "trickle";
        jmessage["candidate"] = jcandidate;
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        jmessage["handle_id"] = handleId;
        mWsClient->SendToJanusAsync(writer.write(jmessage));
    }
    void trickleCandidateComplete(long long int handleId) {
        std::string transactionID = RandomString(12);
        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jcandidate;

        jcandidate["completed"] = true;

        jmessage["janus"] = "trickle";
        jmessage["candidate"] = jcandidate;
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        jmessage["handle_id"] = handleId;
        mWsClient->SendToJanusAsync(writer.write(jmessage));
    }

protected:
    void PCSendSDP(long long int handleId, std::string sdpType, std::string sdp) {
        if (sdpType == "offer") {
            SendOffer(handleId, sdpType, sdp);
        }
        else {
            SendAnswer(handleId, sdpType, sdp);
        }
    }
    virtual void PCQueueUIThreadCallback(int msg_id, void* data) {
        std::cout << msg_id<<std::endl;
        UIThreadCallback(msg_id,
            data);
    }
    virtual void PCTrickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) {
        std::cout << "PCTrickleCandidate" << std::endl;
        trickleCandidate(handleId, candidate);

    }

    virtual void PCTrickleCandidateComplete(long long int handleId) {
        std::cout << "PCTrickleCandidateComplete" << std::endl;
        trickleCandidateComplete(handleId);
    }


    virtual void OnSignedIn() {
        std::cout << "siginedIn called\r\n";
    };
    virtual void OnDisconnected() {
        std::cout << "OnDisconnected\r\n";
    }
    virtual void OnPeerConnected(int id, const std::string& name) {
        std::cout << "peerconnect" << id << name << "\r\n";
    }
    virtual void OnMessageFromJanus(int peer_id, const std::string& message) {
        RTC_DCHECK(!message.empty());
        RTC_LOG(INFO) << "got msg: " << message;
        //TODO make sure in right state
        //parse json
        Json::Reader reader;
        Json::Value jmessage;
        if (!reader.parse(message, jmessage)) {
            RTC_LOG(WARNING) << "Received unknown message. " << message;
            return;
        }
        std::string janus_str;
        std::string json_object;

        rtc::GetStringFromJsonObject(jmessage, "janus",
            &janus_str);
        if (!janus_str.empty()) {
            if (janus_str == "ack") {
                // Just an ack, we can probably ignore
                RTC_LOG(INFO) << "Got an ack on session. ";
            }
            else if (janus_str == "success") {
                rtc::GetStringFromJsonObject(jmessage, "transaction",
                    &janus_str);
                std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
                //call signal
                if (jt) {
                    jt->Success(message);//handle_id not ready yet
                }
                m_transactionMap.erase(janus_str);
            }
            else if (janus_str == "trickle") {
                RTC_LOG(INFO) << "Got a trickle candidate from Janus. ";
            }
            else if (janus_str == "webrtcup") {
                RTC_LOG(INFO) << "The PeerConnection with the gateway is up!";
            }
            else if (janus_str == "hangup") {
                RTC_LOG(INFO) << "A plugin asked the core to hangup a PeerConnection on one of our handles! ";
            }
            else if (janus_str == "detached") {
                RTC_LOG(INFO) << "A plugin asked the core to detach one of our handles! ";
            }
            else if (janus_str == "media") {
                RTC_LOG(INFO) << "Media started/stopped flowing. ";
            }
            else if (janus_str == "slowlink") {
                RTC_LOG(INFO) << "Got a slowlink event! ";
            }
            else if (janus_str == "error") {
                RTC_LOG(INFO) << "Got an error. ";
                // Oops, something wrong happened
                rtc::GetStringFromJsonObject(jmessage, "transaction",
                &janus_str);
                std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
                //call signal
                if (jt) {
                    jt->Error("123","456");//TODO need to parse the error code and desc
                }
                m_transactionMap.erase(janus_str);
            }
            else {

                if (janus_str == "event") {
                    RTC_LOG(INFO) << "Got a plugin event! ";
                    //get publishers
                    list<string> str_publishers{ "plugindata" ,"data","publishers" };
                    Json::Value value_publishers = optJSONValue(message, str_publishers);
                    std::vector<Json::Value> PublisherVec;

                    rtc::JsonArrayToValueVector(value_publishers, &PublisherVec);
                    //constrain the max publishers count to 5
                    for (auto pub : PublisherVec) {
                        std::string str_feedid;
                        std::string display;
                        Json::Value jvalue;
                        rtc::GetValueFromJsonObject(pub, "id", &jvalue);
                        rtc::GetStringFromJsonObject(pub, "display", &display);
                        str_feedid = rtc::JsonValueToString(jvalue);
                        long long int feedId = std::stoll(str_feedid);
                        CreateHandle("janus.plugin.videoroom", feedId, display);
                    }

                    bool bSuccess = rtc::GetStringFromJsonObject(jmessage, "transaction",
                        &janus_str);
                    if (bSuccess) {
                        std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
                        if (jt) {
                            jt->Event(message);
                        }
                    }
                }
            }
        }
    }
    virtual void OnMessageSent(int err) {
        std::cout << "message sent" << err << "\r\n";
    }
    virtual void OnServerConnectionFailure() {
        std::cout << "OnServerConnectionFailure" << "\r\n";
    }
    virtual void OnJanusConnected() {
        std::cout << "OnJanusConnected" << "\r\n";
        // janus session对接
        this->CreateSession();

        
    }
    virtual void OnJanusDisconnected() {
        std::cout << "OnJanusDisconnected" << "\r\n";
    }
    virtual void OnSendKeepAliveToJanus() {
        std::cout << "OnSendKeepAliveToJanus" << "\r\n";
        KeepAlive();
    }

private:
        
    void KeepAlive()
    {
        auto signaling_thread_ = rtc::Thread::Create();
        signaling_thread_->SetName("signaling_thread", nullptr);
        signaling_thread_->Start();
        if (m_SessionId > 0) {
            std::string transactionID = RandomString(12);
            Json::StyledWriter writer;
            Json::Value jmessage;

            jmessage["janus"] = "keepalive";
            jmessage["session_id"] = m_SessionId;
            jmessage["transaction"] = transactionID;
            mWsClient->SendToJanus(writer.write(jmessage));
        }
    }
    void JoinRoom(std::string pluginName, long long int handleId, long long int feedId) {
        //rtcEvents.onPublisherJoined(handle.handleId);
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;

        jt->Event = [=](std::string message) {
            //get sender
            list<string> senderList = { "sender" };
            long long int sender = OptLLInt(message, senderList);
            //get room
            list<string> resultList = { "plugindata","data","result" };
            list<string> roomList = { "plugindata","data","videoroom" };
            list<string> publisherList = { "plugindata","data","videoroom" };

            //echotest return result=ok
            std::string result = OptString(message, resultList);
            if (result == "ok") {
                RTC_LOG(WARNING) << "echotest negotiation ok! ";
            }

            std::string videoroom = OptString(message, roomList);
            //joined the room as a publisher
            if (videoroom == "joined") {
                UIThreadCallback(CREATE_OFFER, (void*)(&handleId));
                //for each search every publisher and create handle to attach them

            }
            //joined the room as a subscriber
            if (videoroom == "attached") {
                //TODO make sure this sdp is offer from remote peer
                list<string> resultList = { "jsep","sdp" };
                std::string jsep_str = OptString(message, resultList);
                REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO;
                pInfo->handleId = handleId;
                pInfo->jsep_str = jsep_str;
                UIThreadCallback(SET_REMOTE_OFFER, pInfo);
            }
        };

        m_transactionMap[transactionID] = jt;

        Json::StyledWriter writer;
        Json::Value jmessage;
        Json::Value jbody;
        if (pluginName == "janus.plugin.videoroom") {
            jbody["request"] = "join";
            jbody["room"] = 1234;//FIXME should be variable
            if (feedId == 0) {
                jbody["ptype"] = "publisher";
                jbody["display"] = "pcg";//FIXME should be variable
            }
            else {
                jbody["ptype"] = "subscriber";
                jbody["feed"] = feedId;
                jbody["private_id"] = 0;//FIXME should be variable
            }
            jmessage["body"] = jbody;
            jmessage["janus"] = "message";
            jmessage["transaction"] = transactionID;
            jmessage["session_id"] = m_SessionId;
            jmessage["handle_id"] = handleId;
            mWsClient->SendToJanus(writer.write(jmessage));
            //After joined,Then create offer
        }
        else if (pluginName == "janus.plugin.audiobridge") {

        }
        else if (pluginName == "janus.plugin.echotest") {
            jbody["audio"] = true;
            jbody["video"] = true;
            jmessage["body"] = jbody;
            jmessage["janus"] = "message";
            jmessage["transaction"] = transactionID;
            jmessage["session_id"] = m_SessionId;
            jmessage["handle_id"] = handleId;
            mWsClient->SendToJanus(writer.write(jmessage));
            //shift the process to UI thread to createOffer
            UIThreadCallback(CREATE_OFFER, (void*)(handleId));
        }


    }
    void CreateHandle(std::string pluginName, long long int feedId, std::string display) {
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;
        jt->Success = [=](std::string message) {
            list<string> handleList = { "data","id" };
            long long int handle_id = OptLLInt(message, handleList);
            //add handle to the map
            std::shared_ptr<JanusHandle> jh(new JanusHandle());
            jh->handleId = handle_id;
            jh->display = display;
            jh->feedId = feedId;
            m_handleMap[handle_id] = jh;
            JoinRoom(pluginName, handle_id, feedId);//TODO feedid means nothing in echotest,else?
        };

        jt->Event = [=](std::string message) {

        };

        jt->Error = [=](std::string, std::string) {
            RTC_LOG(INFO) << "CreateHandle error:";
        };

        m_transactionMap[transactionID] = jt;

        Json::StyledWriter writer;
        Json::Value jmessage;
        jmessage["janus"] = "attach";
        jmessage["plugin"] = pluginName;
        jmessage["transaction"] = transactionID;
        jmessage["session_id"] = m_SessionId;
        mWsClient->SendToJanus(writer.write(jmessage));
    }
    void CreateSession() {

        int rev_tid1 = GetCurrentThreadId();
        std::string transactionID = RandomString(12);
        std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
        jt->transactionId = transactionID;

        //TODO Is it possible for lamda expression here?
        jt->Success = [=](std::string message) mutable {
            list<string> sessionList = { "data","id" };
            m_SessionId = OptLLInt(message, sessionList);
            //lauch the timer for keep alive breakheart
            //Then Create the handle
            CreateHandle("janus.plugin.videoroom", 0, "pcg");
        };

        jt->Error = [=](std::string code, std::string reason) {
            RTC_LOG(INFO) << "Ooops: " << code << " " << reason;
        };


        m_transactionMap[transactionID] = jt;
        Json::StyledWriter writer;
        Json::Value jmessage;
        jmessage["janus"] = "create";
        jmessage["transaction"] = transactionID;
        mWsClient->SendToJanus(writer.write(jmessage));
    };

    PeerConnectionWsClient* mWsClient;
    long long int m_SessionId = 0;
    std::map<std::string, std::shared_ptr<JanusTransaction>> m_transactionMap;
    std::map<long long int, std::shared_ptr<JanusHandle>> m_handleMap;
    std::map<long long int, rtc::scoped_refptr<PeerConnection>> m_peer_connection_map;
    rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> peer_connection_factory_;
    std::unique_ptr<Thread> m_signaling_thread;
    std::unique_ptr<Thread> m_worker_thread;
    std::unique_ptr<Thread> m_network_thread;


};

class ImplPeerConnectionWsClientObserver : 
    public sigslot::has_slots<>,
    public PeerConnectionWsClientObserver {
public:
    ImplPeerConnectionWsClientObserver() {

    };

};


int main()
{
    rtc::EnsureWinsockInit();
    rtc::Win32SocketServer w32_ss;
    rtc::Win32Thread w32_thread(&w32_ss);
    rtc::ThreadManager::Instance()->SetCurrentThread(&w32_thread);

    auto network_thread_ = rtc::Thread::CreateWithSocketServer();
    network_thread_->SetName("network_thread", nullptr);
    network_thread_->Start();

    auto worker_thread_ = rtc::Thread::Create();
    worker_thread_->SetName("worker_thread", nullptr);
    worker_thread_->Start();

    std::unique_ptr<Thread> signaling_thread_ = rtc::Thread::Create();
    signaling_thread_->SetName("signaling_thread", nullptr);
    signaling_thread_->Start();

    VideoRoomClient*client = new VideoRoomClient();

    client->ConectToServer("janusdemo.com", 8188);
    while (true) {
        Sleep(1000);

    }

}

